-
Couldn't load subscription status.
- Fork 537
feat: add framework for File Format Options #3794
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Corwin Joy <[email protected]>
|
ACTION NEEDED delta-rs follows the Conventional Commits specification for release automation. The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. |
|
Note that fully supporting Parquet encryption requires being able to get write and read properties per-file, which is why the existing ability to set |
|
I have marked this pull request as draft. This does not compile as is, I can come back to it once it is able to compile and pass unit tests |
Signed-off-by: Corwin Joy <[email protected]>
@rtyler OK. It seems that when I auto-merged the main branch it introduced a build error. I have resolved this and the code is once again building and passing unit tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see the benefit but we really need to reduce the surface of change that are being introduced
| Ok(DeltaTable::new_with_state( | ||
| this.log_store, | ||
| commit.snapshot(), | ||
| None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not change the function signature here and in the other builders
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you be more specific about what you are looking for? Checking the code base, I see 25 calls to this constructor, and in 10/25 of the cases, I need to pass file_format_options to maintain the needed settings. I guess I could eliminate the 15 cases where I pass None by splitting this into two named constructors...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to use DeltaTableConfig for this:
delta-rs/crates/core/src/table/builder.rs
Lines 30 to 58 in 18f949e
| /// Configuration options for delta table | |
| #[derive(Debug, Serialize, Deserialize, Clone, DeltaConfig)] | |
| #[serde(rename_all = "camelCase")] | |
| pub struct DeltaTableConfig { | |
| /// Indicates whether DeltaTable should track files. | |
| /// This defaults to `true` | |
| /// | |
| /// Some append-only applications might have no need of tracking any files. | |
| /// Hence, DeltaTable will be loaded with significant memory reduction. | |
| pub require_files: bool, | |
| /// Controls how many files to buffer from the commit log when updating the table. | |
| /// This defaults to 4 * number of cpus | |
| /// | |
| /// Setting a value greater than 1 results in concurrent calls to the storage api. | |
| /// This can decrease latency if there are many files in the log since the | |
| /// last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should | |
| /// also be considered for optimal performance. | |
| pub log_buffer_size: usize, | |
| /// Control the number of records to read / process from the commit / checkpoint files | |
| /// when processing record batches. | |
| pub log_batch_size: usize, | |
| #[serde(skip_serializing, skip_deserializing)] | |
| #[delta(skip)] | |
| /// When a runtime handler is provided, all IO tasks are spawn in that handle | |
| pub io_runtime: Option<IORuntime>, | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I think you are suggesting that I add file_format_options to the config member of DeltaTable.
In fact, that was the approach I tried initially but I had to back that out and add this as a direct member for the following reasons:
- The file format options don't really seem to fit properly into
DeltaTableConfigsince these options seem to be more about managing the logfile. - We need to preserve the formatting options when going from
DeltaTabletoDeltaOpsand back. Right now, the config gets lost whennew_with_stateis called. See below where theconfiggets reset to default:
delta-rs/crates/core/src/table/mod.rs
Line 128 in 18f949e
pub(crate) fn new_with_state(log_store: LogStoreRef, state: DeltaTableState) -> Self {
We need to preserve these settings throughout any chained operations. This means we still need an extra parameter innew_with_stateto preserve any existings config. Also, I felt it was cleaner to create a new entry and directly set and pass it. Possibly we could move this intoDeltaTableConfigbut I think this may be more of a hindrance than a help.
@adamreeve It was a couple of months ago that we moved this, do you remember anything else from our discussion? See commit below:
666f0ba
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Correct but they are related to loading the table, which file formats (encryption) should belong as well
- DeltaTableConfig tags along with the snapshot:
delta-rs/crates/core/src/table/state.rs
Lines 73 to 76 in a61ac16
/// Get the table config which is loaded with of the snapshot pub fn load_config(&self) -> &DeltaTableConfig { self.snapshot.load_config() }
so that shouldn't be an issue to allow it to stay there when you do new_with_state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I think it makes sense to move this here. I will investigate this week. The main issue is whether I have to support serialization out of the gate to make this work / at what points config gets reconstructed from serialized properties. The reason why is that serialization of the FileFormatOptions will be a bit tricky because:
- I'm not sure if
TableOptionsfully supports serialization. - For direct encryption and decryption properties, we will need to modify the serialization to make sure that passwords don't get serialized.
crates/core/src/operations/delete.rs
Outdated
| #[allow(clippy::too_many_arguments)] | ||
| async fn execute_non_empty_expr( | ||
| snapshot: &DeltaTableState, | ||
| file_format_options: Option<FileFormatRef>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better if this pushed into the LoadConfig and not passed through each function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate a bit here? There are a lot of these different execute functions (one for every option) and I agree that it would be nice if they had a common configuration structure rather than the somewhat long list of arguments they take. That would probably be independent of this PR. For now, we have just added an additional argument to pass the needed settings through to execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See ping above, it needs to be DeltaTableConfig
|
@corwinjoy - awesome to see this come to fruition! Will find some time to give this a review hopefully tomorrow. At first glance one quick question. Do we see a way to "bundle" the datafusion specific stuff a bit more? It's a bit hard to keep track of all the individual flags while reviewing :) |
What we did to minimize this dependency is define an abstract There might be some ways to refine this further, but in general we've tried to isolate and abstract these file properties where possible and not require datafusion. |
|
@roeap From a user point of view, we've tried hard to make the settings as easy as possible. This can be seen in Calling |
# Conflicts: # crates/core/src/delta_datafusion/table_provider.rs # crates/core/src/operations/delete.rs # crates/core/src/operations/drop_constraints.rs # crates/core/src/operations/filesystem_check.rs # crates/core/src/operations/load.rs # crates/core/src/operations/merge/mod.rs # crates/core/src/operations/mod.rs # crates/core/src/operations/optimize.rs # crates/core/src/operations/restore.rs # crates/core/src/operations/update.rs # crates/core/src/operations/write/mod.rs # crates/core/tests/command_optimize.rs # crates/core/tests/integration_datafusion.rs
Signed-off-by: Corwin Joy <[email protected]>
Signed-off-by: Corwin Joy <[email protected]>
# Conflicts: # crates/core/src/operations/optimize.rs
Signed-off-by: Corwin Joy <[email protected]>
Signed-off-by: Corwin Joy <[email protected]>
|
Still working on this to move the file config to |
Signed-off-by: Corwin Joy <[email protected]>
# Conflicts: # crates/core/src/delta_datafusion/mod.rs # crates/core/src/delta_datafusion/table_provider.rs # crates/core/src/operations/delete.rs # crates/core/src/operations/load.rs # crates/core/src/operations/merge/mod.rs # crates/core/src/operations/optimize.rs # crates/core/src/operations/update.rs # crates/core/src/operations/write/execution.rs # crates/core/src/operations/write/mod.rs # crates/core/tests/command_optimize.rs
Signed-off-by: Corwin Joy <[email protected]>
Signed-off-by: Corwin Joy <[email protected]>
Signed-off-by: Corwin Joy <[email protected]>
|
@ion-elgreco OK. I have migrated these file options to the config property in |
| } | ||
| Ok(self) | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ion-elgreco We could use some feedback on the best way to set the config for an existing table. In this design, we wanted to:
- Be able to set the config at runtime, not just at table construction. This is important for fields like passwords, where we will not want them to be serialized, or other options that we may want to change at runtime.
- Make the user interface easy to use.
With this design, setting the config for any operation looks like: (from crates/deltalake/examples/basic_operations_encryption.rs)
let ops = DeltaOps::try_from_uri(url).await?;
let ops = ops
.with_file_format_options(file_format_options.clone())
.await?;
We also considered a design where you could only set this via DeltaTableBuilder but I wanted to make this feature easy to use. See e.g. the following diff where this function is removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just pushed a fixed commit that handles when the load fails if the table isn't created yet: adamreeve@0d551a5
This matches the behaviour of DeltaOps::try_from_uri. Maybe there could be a DeltaOps::try_from_table or TryFrom<DeltaTable> for DeltaOps implementation to handle that scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am having hard time understanding this usecase. Why wouldn't it be enough to update the DeltaTableConfig, I would assume without setting the encryption you couldn't load the table at all
I'll do another review over the weekend |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3794 +/- ##
==========================================
- Coverage 73.99% 73.85% -0.15%
==========================================
Files 148 153 +5
Lines 38904 39490 +586
Branches 38904 39490 +586
==========================================
+ Hits 28788 29165 +377
- Misses 8850 9028 +178
- Partials 1266 1297 +31 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| snapshot: &EagerSnapshot, | ||
| log_store: LogStoreRef, | ||
| session: &dyn Session, | ||
| file_format_options: Option<&FileFormatRef>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not needed anymore, since you can access the snapshot.load_config() to access the file_format_options
| snapshot: &EagerSnapshot, | ||
| log_store: LogStoreRef, | ||
| session: &dyn Session, | ||
| file_format_options: Option<&FileFormatRef>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here it's already available in the snapshot. so we can defer at latest stage to grab these from the load_config
| // Add path column | ||
| used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); | ||
|
|
||
| let table_parquet_options = to_table_parquet_options_from_ffo(file_format_options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer we Impl Into here between these two structs.
| limit: Option<usize>, | ||
| files: Option<&'a [Add]>, | ||
| config: Option<DeltaScanConfig>, | ||
| parquet_options: Option<TableParquetOptions>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we remove this here. We can move all this logic into:
DeltaScanConfigBuilder.build(), there we can introspect the TableLoadConfig and set the parquetOptions on the DeltaScanConfig
| if let Some(format_options) = &self.config.file_format_options { | ||
| format_options.update_session(session)?; | ||
| } | ||
| let filter_expr = conjunction(filters.iter().cloned()); | ||
|
|
||
| let scan = DeltaScanBuilder::new(self.snapshot()?.snapshot(), self.log_store(), session) | ||
| .with_parquet_options( | ||
| crate::table::file_format_options::to_table_parquet_options_from_ffo( | ||
| self.config.file_format_options.as_ref(), | ||
| ), | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be removed since this will happen inside the DeltaScanBuilder with my suggestion above
| config: DeltaScanConfig, | ||
| schema: Arc<Schema>, | ||
| files: Option<Vec<Add>>, | ||
| file_format_options: Option<FileFormatRef>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, not needed anymore, since we can pass it through the DeltaScanConfig
| Ok((operation, metrics)) | ||
| } | ||
|
|
||
| async fn get_file_decryption_properties( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can this return None as well?
| } | ||
| } | ||
|
|
||
| /// Extension trait to obtain a `WriterPropertiesBuilder` from an existing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
56.2.0 has landed so this can be removed
| } | ||
| } | ||
|
|
||
| pub fn build_writer_properties_factory_ffo( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need a function for this 🤷
| file_format_options: Option<FileFormatRef>, | ||
| ) -> WriterPropertiesFactoryRef { | ||
| build_writer_properties_factory_ffo(file_format_options) | ||
| .unwrap_or_else(|| build_writer_properties_factory_default()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unwrap_or_default is more idiomatic
| } | ||
|
|
||
| #[cfg(feature = "datafusion")] | ||
| pub fn to_table_parquet_options_from_ffo( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same applies here, I dont think we need a function for just a map()
|
@corwinjoy looks already better but I think we can reduce the amount of line changes even more! I still have to take a better look at why we need a WriterPropertiesFactory :s but maybe you can explain it shortly for me? |
Corwin is busy travelling this week so might be slow to reply, but I can help answer this part. For some use cases it might be fine to have a single
This also aligns with the DataFusion |
Description
This PR adds encryption support and other advanced file options to
delta-rsby implementing a comprehensive framework for file format settings. The changes enable users to configure encryption settings, customize writer properties, and apply file-level formatting options when reading and writing Delta tables.FileFormatOptionstrait and related infrastructure to handle file-specific configurationsIn general, we have added a new trait called
FileFormatOptionsat the rootDeltaTablelevel to unify how files within a delta table are read and written with specific formatting. The idea is that you can apply these settings once, at the top level, and then seamlessly perform any operations with the necessary settings.This PR leverages the DataFusion
TableOptionsstructure to support format options for multiple underlying file formats. (The idea being thatdelta-rsmay eventually want to support storage formats beyond Parquet, such as Vortex or Lance.) Additionally, it centralizes file format options in a single, consistent location. This avoids the current difficulties where one has to separately setWriterProperties; then reader properties as part of theSessionState. (This is in line with comments from @roeap about how file configuration might be improved: #3300 (comment)). We would also like to eventually extend this upgrade to add notations about these file configurations to the delta table properties. For example, if the files are encrypted, one could add a KMS configuration for where to retrieve encryption keys.Review Suggestion
This PR turned out to be larger than we hoped, so apologies for that, but I don't know how to split it into smaller pieces.
When reviewing, we suggest starting with the file
crates/core/src/table/file_format_options.rsto get an overview of the new file format trait that can be applied to delta tables.Related Issue(s)
Support Parquet Modular Encryption:
#3300
Documentation
Parquet Modular Encryption: https://docs.google.com/document/d/1MUg1J7u5VdLkgejJ4ybzfZt1OmwhQkq2iGPxsn4gqLI/edit?tab=t.0#heading=h.34wvmhc1zdch
Attribution
This PR was created in collaboration with @adamreeve